from kafka import KafkaProducer
import json
from json import dumps

# Kafka配置
KAFKA_SERVERS = ['localhost:9092']  # 替换为你的Kafka服务器地址
TOPIC1 = 'topic3'

class KafkaProducerExample:
    def __init__(self):
        # 初始化生产者
        self.producer = KafkaProducer(
            bootstrap_servers=KAFKA_SERVERS,
            value_serializer=lambda x: dumps(x).encode('utf-8')
        )
    
    def send_message(self, user_id, person_name):
        """发送消息到topic1"""
        message = {
            "top3userid": user_id,
            "personpic": person_name
        }
        
        try:
            self.producer.send(TOPIC1, value=message)
            self.producer.flush()  # 确保消息被发送
            print(f"成功发送消息: {message}")
            return True
        except Exception as e:
            print(f"发送消息失败: {e}")
            return False
    
    def close(self):
        """关闭生产者连接"""
        self.producer.close()

if __name__ == "__main__":
    # 创建生产者实例
    producer = KafkaProducerExample()
    
    try:
        # 示例：发送几条测试消息
        

        producer.send_message("22222", "http://192.168.2.242/test1.jpg")
        producer.send_message("33333", "http://192.168.2.242/test1.jpg")
        producer.send_message("11111", "http://192.168.2.242/test1.jpg")
        print("所有消息已发送完成")
    finally:
        # 关闭生产者
        producer.close()

